Data streams v2#1985
Conversation
Using sendText means that future optimizations to send a single data stream packet can be enabled.
…t / split into 15k MTU Doing this makes data arrive a bit faster since it's not waiting for compressed bytes to accumulate before sending out a "fully filled" packet
Tag each data stream chunk with a "compression index" which if > 0 points to a given DecompressionStream which should be used for preprocessing received bytes.
…s whole data stream
fflate allows flushing the compressed bytes midway through so that they
can be emitted "together" as a chunk in the output stream, which a
DecompressionStream('deflate-raw') can properly consume.
…nStream for single packet sends So now fflate is only used in the multi packet data stream case, and could be more easily broken out into its own concrete dependency block.
…ally make much of a difference for agent transcriptions And I don't think it would be too hard to re-introduce in the future as an opt in setting, either with fflate, or if compressionstream gets an explicit "flush" option
|
There was a problem hiding this comment.
| /** The data-streams-v2 wire signals carried directly on the header: the compression flag and the | ||
| * inline single-packet payload. Both used to live in reserved header attributes; they are now | ||
| * first-class protobuf fields on `DataStream.Header`. */ | ||
| export interface StreamHeaderV2Fields { | ||
| /** Compression applied to the inline/chunked payload. Defaults to `NONE` when omitted. */ | ||
| compression?: DataStream_CompressionType; | ||
| /** The full payload smuggled into the header for single-packet (inline) sends. */ | ||
| inlineContent?: Uint8Array; | ||
| } |
There was a problem hiding this comment.
Note to self: get rid of this and inline these two fields directly into the relevant function calls. Or name this something more generic if I decide to keep it.
| mimeType: 'text/plain', | ||
| timestamp: Date.now(), | ||
| topic: options?.topic ?? '', | ||
| size: totalTextLength, // NOTE: size is always the pre-compression byte length |
There was a problem hiding this comment.
question: I'm not 100% sure this is the right call, making the data stream header size always the uncompressed byte length. This means effectively that the value could be larger than the literal compressed bytes sent, which is a little weird.
The reason why I decided to do it is it is impossible to determine the compressed byte length ahead of time before compressing all the data. So if this were the compressed length, you'd have to receive all file chunks, compress each one, then buffer all in memory before you could finish the stream and determine the full length, which obviously isn't going to work.
| // set text part of progress to 1 | ||
| handleProgress(1, 0); |
There was a problem hiding this comment.
Note to self: add some tests for the onProgress stuff. I'm not sure I've fully exercised it to make sure I didn't break anything here. Though worth noting I did find some bugs where in some paths onProgress wasn't being called before so I'm actually not convinced it was working properly before either.
| // Incremental text streams are never compressed (CompressionStream does not support flushing | ||
| // mid-stream); one-shot compression lives in sendText. | ||
| // | ||
| // Note that a future streamText could send a context-takeover style deflate-raw stream with | ||
| // intermedia explicit `Z_SYNC_FLUSH`s - receivers already will handle this properly today. |
There was a problem hiding this comment.
Note to reviewers - this is an important callout. The Outgoing- half of this change doesn't support compression for streamed data streams (only sendText / sendFile) but the infrastructure exists for this on the Incoming- half so this could be added in the future in a fully backwards compatible way without needing a new clientProtocol bump.
| // FIXME: make this a global event to ensure "max listener" warning won't get logged for lots of | ||
| // in flight data streams. | ||
| engine.once(EngineEvent.Closing, onEngineClose); |
There was a problem hiding this comment.
Note to self: fix this pre-existing bug as part of this larger change, I think right now there will me a "max listener" warning if you try to send too many data streams concurrently.
| // Phase 1: Try to send as a single packet data stream | ||
| // | ||
| // This is not being done explictly for files, because it's challenging to determine ahead of | ||
| // time how well the file contents will compress (and whether the total output will be under the | ||
| // MTU). Revisit this in the future though. |
There was a problem hiding this comment.
Important callout for reviewers - sendFile has some special behavior (it doesn't do single packet data streams) because it's difficult / impossible to determine if the whole file will fit into a single packet without buffering in memory aggressively.
| export async function deflateRawDecompress(data: Uint8Array): Promise<Uint8Array> { | ||
| const ds = new DecompressionStream('deflate-raw'); | ||
| const writer = ds.writable.getWriter(); | ||
| writer.write(data as NonSharedUint8Array); | ||
| writer.close(); | ||
| return collect(ds.readable); | ||
| } |
There was a problem hiding this comment.
Note to reviewers: I opted to use deflate-raw instead of deflate or gzip because deflate-raw is the most compact representation and the extra metadata that deflate includes (checksum and ordering) is superfluous in a SCTP delivered reliable data channel context.
There was a problem hiding this comment.
Note to reviewers: here is the (mostly complete) spec for this feature.

Warning
This change relies on some new SFU updates to be able to function. Checkout
main(really, livekit/livekit@86a79f8 or later), build + run the SFU locally, and point any test app at this local SFU for testing.Overview
Data streams v2 consists of some updates to data streams to add three things:
This pull request contains the initial web implementation and initial data streams spec which will be used to propagate this across all other client sdks.
Performance
Both 1 and 2 together at least doubles⚠️ data stream throughput across the spectrum of network conenctions and payload sizes.
Before:

After:
